package cn.zhaopin.starter.mq.support;

import cn.zhaopin.starter.mq.common.PulsarMessage;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.api.schema.SchemaWriter;
import org.apache.pulsar.client.impl.schema.AvroBaseStructSchema;
import org.apache.pulsar.client.impl.schema.util.SchemaUtil;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

/**
 * Description: 自定义json schema
 *
 * @author: zuomin (myleszelic@outlook.com)
 * @date: 2021/07/22-10:00
 */
@SuppressWarnings({"rawtypes", "unchecked"})
public class JSON2Schema extends AvroBaseStructSchema<PulsarMessage> {

    private static final Logger log = LoggerFactory.getLogger(JSON2Schema.class);
    private static final ThreadLocal<ObjectMapper> JSON_MAPPER = ThreadLocal.withInitial(() -> {
        ObjectMapper objectMapper = new ObjectMapper();
        // 支持 LocalDate、LocalDateTime的序列号
        objectMapper.disable(SerializationFeature.WRITE_DATE_KEYS_AS_TIMESTAMPS);
        objectMapper.registerModule(new JavaTimeModule());
        // 指定要序列化的域，field,get和set,以及修饰符范围，ANY是都有包括private和public
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        // 指定序列化输入的类型，类必须是非final修饰的，final修饰的类，比如String,Integer等会跑出异常
        // objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL);
        // json进行换行缩进等操作
        // objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
        // 如果json中有新增的字段并且是实体类类中不存在的，不报错
        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        // null 不序列化
        objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
        return objectMapper;
    });

    private JSON2Schema(SchemaInfo schemaInfo, Class clazz, SchemaReader reader, SchemaWriter writer) {
        super(schemaInfo);
        this.setWriter(writer);
        this.setReader(reader);
    }

    public static JSON2Schema of(SchemaDefinition schemaDefinition) {
        SchemaReader reader = (SchemaReader) schemaDefinition.getSchemaReaderOpt().orElseGet(() -> new Jackson2JsonReader(JSON_MAPPER.get()));
        SchemaWriter writer = (SchemaWriter) schemaDefinition.getSchemaWriterOpt().orElseGet(() -> new Jackson2JsonWriter(JSON_MAPPER.get()));
        return new JSON2Schema(SchemaUtil.parseSchemaInfo(schemaDefinition, SchemaType.JSON), schemaDefinition.getPojo(), reader, writer);
    }

    public static JSON2Schema of(Class pojo) {
        return of(SchemaDefinition.builder().withPojo(pojo).build());
    }

    public static JSON2Schema of(Class pojo, Map<String, String> properties) {
        return of(SchemaDefinition.builder().withPojo(pojo).withProperties(properties).build());
    }

    public static ObjectMapper getObjectMapper() {
        return JSON_MAPPER.get();
    }
}
